Skip to content

Conversation

@openinx
Copy link
Member

@openinx openinx commented Nov 20, 2020

This patch port the flink streaming reader from here. https://github.com/generic-datalake/iceberg-poc/pull/3

@openinx openinx marked this pull request as draft November 20, 2020 13:34
@github-actions github-actions bot added the flink label Nov 20, 2020
ScanContext options) {
Preconditions.checkArgument(
options.snapshotId() == null && options.asOfTimestamp() == null,
"The streaming reader does not support using snapshot-id or as-of-timestamp to select the table snapshot.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to include what _should _ be used to configure the stream. Looks like it should be startSnapshotId with no endSnapshotId.

Copy link
Member Author

@openinx openinx Dec 4, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user did not provide a start-snapshot-id, then this function will use the snapshot id (it's a DUMMY_START_SNAPSHOT_ID in code) before current snapshot as the start-snapshot-id. So we are allowed to pass null start-snapshot-id but are not allowed to pass non-null end-snapshot-id.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably also check this

    Preconditions.checkArgument(scanContext.asOfTimestamp() == null,
        "Can't set asOfTimestamp in ScanContext for continuous enumerator");

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current condition ctxt.snapshotId() == null && ctxt.asOfTimestamp() == null will require both the snapshotId and asOfTimestamp to be null. So we don't have to do the extra scanContext.asOfTimestamp() == null check now. But I prefer to use the separate Preconditions.checkArgument for snapshotId and asOfTimestamp, so that people could distingush the cause quickly.

@rdblue
Copy link
Contributor

rdblue commented Nov 24, 2020

Looking pretty good to me, at least for the Iceberg concerns. It would be great to have @JingsongLi and @stevenzwu review as well.


import static org.apache.iceberg.types.Types.NestedField.required;

public class TestStreamScanSql extends AbstractTestBase {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's good to make this unit tests to extend FlinkTestBase class.

@openinx openinx self-assigned this Jan 5, 2021
@openinx openinx marked this pull request as ready for review January 5, 2021 10:11
@openinx
Copy link
Member Author

openinx commented Jan 5, 2021

@rdblue @stevenzwu Would you mind to take another look ? I've updated this patch, Thanks.

public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);

checkpointState = context.getOperatorStateStore().getListState(
Copy link
Contributor

@stevenzwu stevenzwu Jan 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened issue-1698 a while back regarding a more stable serializer (than Java serializable) for Flink checkpointing. While Java serialization works well for batch jobs, we need a more stable serialization to support schema evolution for long-running stream jobs. We need sth like DeltaManifestsSerializer and ManifestFiles.encode for CombinedScanTask.

It doesn't have to be done in the initial version. So I don't think it is a blocker for this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry , I missed this issue before. Let me take a look .

super.snapshotState(context);

checkpointState.clear();
for (FlinkInputSplit split : splits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we use addAll instead, which translates to one merge call in rocksdb?

@openinx
Copy link
Member Author

openinx commented Jan 11, 2021

@rdblue , After talked with some flink users from Asia. They have strong demand for this feature because it's necessary for building a classic data pipeline : (kafka) -> (flink) -> (iceberg) -> (flink) -> (iceberg) -> ... . I think it's good to merge this before iceberg 0.11.0 release, would you like to take another look when you have time ? Thanks.

@rdblue
Copy link
Contributor

rdblue commented Jan 11, 2021

@openinx, I will take a look. Sorry I didn't get a chance to finish the review I started a few days ago. I agree that this is important to get in and will make some time for it this week.


private static final Logger LOG = LoggerFactory.getLogger(StreamingReaderOperator.class);

private final MailboxExecutor executor;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that using MailboxExecutor is a good idea. If I understand correctly, the mailbox queue for the executor cannot be used to hold state because that state would not be checkpointed (if, for example, it held the splits waiting to be processed). The result is that this operator has an elaborate way to keep state in a queue and continuously submit stateless mailbox tasks to keep running. But a simpler option is to create a thread directly that polls the splits queue and keeps running endlessly.

Copy link
Member Author

@openinx openinx Jan 13, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We cannot use a newly created thread to process the split asynchronously because it will break the checkpoint mechanism which depends on mail-box model in flink runtime, Assume the asynchronously thread keep processing the records of the newly split (which is polled from splits queue), now the flink checkpoint barrier come, How should we coordinate the checkpoint barrier and the processing split so that the barrier could effect ( trigger to persist all states of this operator ) once the current split is finished ? Will we go back to use the checkpoint lock to synchronize between checkpoint barrier event and the newly introduced async thread ?

In the current mail-box model, both flink's internal controlling action and user-provided events will be processed in the same thread ( Which is StreamTask in flink runtime). The StreamTask will run in a endless loop to process the event from mail box queue ( see the code here). For each loop, it will:

Step.1. Try to take mails which has been enqueued in mail-box queue ( code), those mails are flink's controlling actions, such as action to trigger flink's checkpoint , action to notify checkpoint complete etc. If there's no mail enqueued in mail-box queue, then the processMail will do nothing.

Step.2 Then read one completed record from the flink's network queue and invoke the processElement(record) , that's the process about incremental compute. Take the sum as an example, once a record come, the processElement will increment its counter.

So all the events (Regardless of flink's control events or user's events ) are being processed in the same thread StreamTask. In our flink streaming reader, we only need to control that there's only one split that is being processing, then the newly triggered checkpoint action could just enqueue the mail-box queue. Once the processing splits is finished, the StreamTask will execute the checkpoint action in step.1 . We don't need any extra checkpoint lock or extra synchronization.

That's why we use Mailbox to enqueue the action ( processSplits )to process whole elements from a given split.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I understand now. The missing piece of information was that the MailboxExecutor where these tasks are added is the same mailbox executor that is running the operator. Is that correct?

I thought that it was a separate mailbox executor and thread, which wouldn't make much sense. I also noted the problem with checkpoints that you pointed out, but thought that it must be handled within the mailbox.

If it is the same executor, then it makes sense because the same thread processing incoming checkpoint events will also run processSplits. I think I see what you're doing now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the same mailbox executor that is running the operator, then it would be helpful to add a comment here that says it for future readers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The missing piece of information was that the MailboxExecutor where these tasks are added is the same mailbox executor that is running the operator. Is that correct?

Yes, you're right. Let me add few comment to explain this more cleaner in code.

@github-actions github-actions bot added the data label Jan 13, 2021
}

private void enqueueProcessSplits() {
if (currentSplitState == SplitState.IDLE) {
Copy link
Contributor

@rdblue rdblue Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Should this only queue a task if splits is non-empty?

If splits is currently empty and this is called from processSplits, then a new task will be queued. That task will process a split if one is waiting in the mailbox queue to be processed, but often it will do nothing and set the split state back to IDLE.

I'd probably only add a new task if splits is non-empty, or update processSplits to always submit a new task and not set IDLE in the finally block.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I like this improvement. Will update it in the next patch.

// currentSplitState will be marked as RUNNING). After finished all records processing, the currentSplitState will
// be marked as IDLE again.
// NOTICE: all the reader and writer of this variable are the same thread, so we don't need extra synchronization.
private transient SplitState currentSplitState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would rephrase the comment here. I didn't really understand it when I read it the first time. Here's what I would suggest, assuming that I understand what's happening here:

Splits are read by the same thread that calls processElement. Each read task is submitted to that thread by adding them to the executor. This state is used to ensure that only one read task is in that queue at a time, so that read tasks do not accumulate ahead of checkpoint tasks. When there is a read task in the queue, this is set to RUNNING. When there are no more files to read, this will be set to IDLE.

@rdblue
Copy link
Contributor

rdblue commented Jan 14, 2021

@openinx, I took another look at this after understanding that the mailbox executor is shared and everything looks good to me. I made a few minor comments you could fix, but assuming that my interpretation of your last comment is correct I think this is ready to commit. Once tests are passing, of course.

@openinx openinx merged commit 14331c4 into apache:master Jan 14, 2021
@openinx
Copy link
Member Author

openinx commented Jan 14, 2021

All checks passed, Just merged this PR. Thanks all for reviewing.

@openinx openinx added this to the Java 0.11.0 Release milestone Jan 22, 2021
XuQianJin-Stars pushed a commit to XuQianJin-Stars/iceberg that referenced this pull request Mar 22, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants